package org.yuanzheng.window

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import org.yuanzheng.source.StationLog

/**
 * @author yuanzheng
 * @date 2020/6/21-17:36
 */
object TestProcessWindowFunction {
  def main(args: Array[String]): Unit = {
    val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.streaming.api.scala._

    //读取数据源
    val stream: DataStream[StationLog] = streamEnv.socketTextStream("192.168.1.8", 8888)
      .map(line => {
        var split = line.split(",")
        new StationLog(split(0).trim, split(1).trim, split(2).trim, split(3).trim, split(4).trim.toLong, split(5).trim.toLong)
      })

    //开窗
    stream.map(log => (log.sid, 1)).keyBy(_._1).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .process(new ProcessWindowFunction[(String, Int), (String, Long), String, TimeWindow] { //一个窗口期结束后执行一次
        override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Long)]): Unit = {
          out.collect((key, elements.size))
        }
      }).print()
    streamEnv.execute()
  }
}
